{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Name\n",
    "Submitting a Cloud Machine Learning Engine training job as a pipeline step\n",
    "\n",
    "# Label\n",
    "GCP, Cloud ML Engine, Machine Learning, pipeline, component, Kubeflow, Kubeflow Pipeline\n",
    "\n",
    "# Summary\n",
    "A Kubeflow Pipeline component to submit a Cloud ML Engine training job as a step in a pipeline.\n",
    "\n",
    "# Details\n",
    "## Intended use\n",
    "Use this component to submit a training job to Cloud ML Engine from a Kubeflow Pipeline. \n",
    "\n",
    "## Runtime arguments\n",
    "| Argument | Description | Optional | Data type | Accepted values | Default |\n",
    "|:------------------|:------------------|:----------|:--------------|:-----------------|:-------------|\n",
    "| project_id | The ID of the Google Cloud Platform (GCP) project of the job. | No | GCPProjectID |  |  |\n",
    "| python_module | The name of the Python module to run after installing the training program. | Yes | String |  | None |\n",
    "| package_uris | The Cloud Storage location of the packages that contain the training program and any additional dependencies. The maximum number of package URIs is 100. | Yes | List |  | None |\n",
    "| region | The Compute Engine region in which the training job is run. | Yes | GCPRegion |  | us-central1 |\n",
    "| args | The command line arguments to pass to the training program. | Yes | List |  | None |\n",
    "| job_dir | A Cloud Storage path in which to store the training outputs and other data needed for training. This path is passed to your TensorFlow program as the `job-dir` command-line argument. The benefit of specifying this field is that Cloud ML validates the path for use in training. | Yes | GCSPath |  | None |\n",
    "| python_version | The version of Python used in training. If it is not set, the default version is 2.7. Python 3.5 is available when the runtime version is set to 1.4 and above. | Yes | String |  | None |\n",
    "| runtime_version | The runtime version of Cloud ML Engine to use for training. If it is not set, Cloud ML Engine uses the default. | Yes | String |  | 1 |\n",
    "| master_image_uri | The Docker image to run on the master replica. This image must be in Container Registry. | Yes | GCRPath |  | None |\n",
    "| worker_image_uri | The Docker image to run on the worker replica. This image must be in Container Registry. | Yes | GCRPath |  | None |\n",
    "| training_input | The input parameters to create a training job. | Yes | Dict | [TrainingInput](https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#TrainingInput) | None |\n",
    "| job_id_prefix | The prefix of the job ID that is generated. | Yes | String |  | None |\n",
    "| job_id | The ID of the job to create, takes precedence over generated job id if set. | Yes | String | - | None |\n",
    "| wait_interval | The number of seconds to wait between API calls to get the status of the job. | Yes | Integer |  | 30 |\n",
    "\n",
    "\n",
    "\n",
    "## Input data schema\n",
    "\n",
    "The component accepts two types of inputs:\n",
    "*   A list of Python packages from Cloud Storage.\n",
    "    * You can manually build a Python package and upload it to Cloud Storage by following this [guide](https://cloud.google.com/ml-engine/docs/tensorflow/packaging-trainer#manual-build).\n",
    "*   A Docker container from Container Registry. \n",
    "    * Follow this [guide](https://cloud.google.com/ml-engine/docs/using-containers) to publish and use a Docker container with this component.\n",
    "\n",
    "## Output\n",
    "| Name    | Description                 | Type      |\n",
    "|:------- |:----                        | :---      |\n",
    "| job_id  | The ID of the created job.  |  String   |\n",
    "| job_dir | The Cloud Storage path that contains the trained model output files. |  GCSPath  |\n",
    "\n",
    "\n",
    "## Cautions & requirements\n",
    "\n",
    "To use the component, you must:\n",
    "\n",
    "*   Set up a cloud environment by following this [guide](https://cloud.google.com/ml-engine/docs/tensorflow/getting-started-training-prediction#setup).\n",
    "*   The component can authenticate to GCP. Refer to [Authenticating Pipelines to GCP](https://www.kubeflow.org/docs/gke/authentication-pipelines/) for details.\n",
    "*   Grant the following access to the Kubeflow user service account: \n",
    "    *   Read access to the Cloud Storage buckets which contain the input data, packages, or Docker images.\n",
    "    *   Write access to the Cloud Storage bucket of the output directory.\n",
    "\n",
    "## Detailed description\n",
    "\n",
    "The component builds the [TrainingInput](https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#TrainingInput) payload and submits a job via the [Cloud ML Engine REST API](https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs).\n",
    "\n",
    "The steps to use the component in a pipeline are:\n",
    "\n",
    "\n",
    "1.  Install the Kubeflow Pipeline SDK:\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture --no-stderr\n",
    "\n",
    "KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar.gz'\n",
    "!pip3 install $KFP_PACKAGE --upgrade"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "2. Load the component using KFP SDK"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.components as comp\n",
    "\n",
    "mlengine_train_op = comp.load_component_from_url(\n",
    "    'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/ml_engine/train/component.yaml')\n",
    "help(mlengine_train_op)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Sample\n",
    "Note: The following sample code works in an IPython notebook or directly in Python code.\n",
    "\n",
    "In this sample, you use the code from the [census estimator sample](https://github.com/GoogleCloudPlatform/cloudml-samples/tree/master/census/estimator) to train a model in Cloud ML Engine. To upload the code to Cloud ML Engine, package the Python code and upload it to a Cloud Storage bucket. \n",
    "\n",
    "Note: You must have read and write permissions on the bucket that you use as the working directory.\n",
    "#### Set sample parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "# Required Parameters\n",
    "PROJECT_ID = '<Please put your project ID here>'\n",
    "GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Optional Parameters\n",
    "EXPERIMENT_NAME = 'CLOUDML - Train'\n",
    "TRAINER_GCS_PATH = GCS_WORKING_DIR + '/train/trainer.tar.gz'\n",
    "OUTPUT_GCS_PATH = GCS_WORKING_DIR + '/train/output/'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Clean up the working directory"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture --no-stderr\n",
    "!gsutil rm -r $GCS_WORKING_DIR"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Download the sample trainer code to local"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture --no-stderr\n",
    "!wget https://github.com/GoogleCloudPlatform/cloudml-samples/archive/master.zip\n",
    "!unzip master.zip"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Package code and upload the package to Cloud Storage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture --no-stderr\n",
    "%%bash -s \"$TRAINER_GCS_PATH\"\n",
    "pushd ./cloudml-samples-master/census/estimator/\n",
    "python setup.py sdist\n",
    "gsutil cp dist/preprocessing-1.0.tar.gz $1\n",
    "popd\n",
    "rm -fr ./cloudml-samples-master/ ./master.zip ./dist"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Example pipeline that uses the component"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.dsl as dsl\n",
    "import json\n",
    "@dsl.pipeline(\n",
    "    name='CloudML training pipeline',\n",
    "    description='CloudML training pipeline'\n",
    ")\n",
    "def pipeline(\n",
    "    project_id = PROJECT_ID,\n",
    "    python_module = 'trainer.task',\n",
    "    package_uris = json.dumps([TRAINER_GCS_PATH]),\n",
    "    region = 'us-central1',\n",
    "    args = json.dumps([\n",
    "        '--train-files', 'gs://cloud-samples-data/ml-engine/census/data/adult.data.csv',\n",
    "        '--eval-files', 'gs://cloud-samples-data/ml-engine/census/data/adult.test.csv',\n",
    "        '--train-steps', '1000',\n",
    "        '--eval-steps', '100',\n",
    "        '--verbosity', 'DEBUG'\n",
    "    ]),\n",
    "    job_dir = OUTPUT_GCS_PATH,\n",
    "    python_version = '',\n",
    "    runtime_version = '1.10',\n",
    "    master_image_uri = '',\n",
    "    worker_image_uri = '',\n",
    "    training_input = '',\n",
    "    job_id_prefix = '',\n",
    "    job_id = '',\n",
    "    wait_interval = '30'):\n",
    "    task = mlengine_train_op(\n",
    "        project_id=project_id, \n",
    "        python_module=python_module, \n",
    "        package_uris=package_uris, \n",
    "        region=region, \n",
    "        args=args, \n",
    "        job_dir=job_dir, \n",
    "        python_version=python_version,\n",
    "        runtime_version=runtime_version, \n",
    "        master_image_uri=master_image_uri, \n",
    "        worker_image_uri=worker_image_uri, \n",
    "        training_input=training_input, \n",
    "        job_id_prefix=job_id_prefix,\n",
    "        job_id=job_id,\n",
    "        wait_interval=wait_interval)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Compile the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_func = pipeline\n",
    "pipeline_filename = pipeline_func.__name__ + '.zip'\n",
    "import kfp.compiler as compiler\n",
    "compiler.Compiler().compile(pipeline_func, pipeline_filename)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Submit the pipeline for execution"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Specify pipeline argument values\n",
    "arguments = {}\n",
    "\n",
    "#Get or create an experiment and submit a pipeline run\n",
    "import kfp\n",
    "client = kfp.Client()\n",
    "experiment = client.create_experiment(EXPERIMENT_NAME)\n",
    "\n",
    "#Submit a pipeline run\n",
    "run_name = pipeline_func.__name__ + ' run'\n",
    "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Inspect the results\n",
    "\n",
    "Use the following command to inspect the contents in the output directory:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gsutil ls $OUTPUT_GCS_PATH"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## References\n",
    "* [Component python code](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/component_sdk/python/kfp_component/google/ml_engine/_train.py)\n",
    "* [Component docker file](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/Dockerfile)\n",
    "* [Sample notebook](https://github.com/kubeflow/pipelines/blob/master/components/gcp/ml_engine/train/sample.ipynb)\n",
    "* [Cloud Machine Learning Engine job REST API](https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs)\n",
    "\n",
    "## License\n",
    "By deploying or using this software you agree to comply with the [AI Hub Terms of Service](https://aihub.cloud.google.com/u/0/aihub-tos) and the [Google APIs Terms of Service](https://developers.google.com/terms/). To the extent of a direct conflict of terms, the AI Hub Terms of Service will control."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
